iT邦幫忙

2025 iThome 鐵人賽

DAY 26
0
Software Development

用 FastAPI 打造你的 AI 服務系列 第 26

[Day 26] 實戰範例 (二):影片處理 SaaS (下)

  • 分享至 

  • xImage
  •  

在上一篇文章中,我們快速搭建了一個影片處理服務的原型。儘管它能夠正常運作,但無論是基於記憶體的即時處理,還是採用 BackgroundTasks 的磁碟方案,都暴露出明顯的架構瓶頸:所有任務都被限制在單一服務實例中執行,不僅併發處理能力受限,更缺乏企業級的任務管理與容錯機制。

今天,我們將進行一次架構層面的重大升級,引入業界成熟的分散式任務佇列系統——Celery,徹底重構我們的服務架構。本次升級的核心目標包括:

  • 替換 BackgroundTasks:將影片處理遷移至獨立的 Celery Worker 程序,實現真正的分散式處理
  • 實現可靠的任務控制:善用 Celery 強大的任務管理機制,提供優雅的任務取消與重試功能
  • 採用磁碟暫存策略:運用最穩健的磁碟儲存方案,確保大檔案處理的可靠性
  • 探索進階架構設計:深入討論平行處理模式及其在 AI 應用場景中的實務考量

之前在 Day 17Day 18 有簡單介紹過 Celery,如果不熟悉的話歡迎回去複習XD

磁碟暫存 vs 記憶體暫存的抉擇

在處理大型影片檔案時,我們面臨著一個關鍵的架構決策:應該將上傳的檔案暫存在記憶體中,還是直接寫入磁碟?

記憶體暫存的雙面刃特性

記憶體暫存方案的最大優勢在於其無與倫比的存取速度。記憶體的讀寫效能遠超磁碟 I/O,這使得小檔案的處理極為高效。同時,這種方案在實作上相對簡潔,開發者無需處理檔案系統權限、磁碟空間管理等複雜問題,且檔案不會在磁碟上留下痕跡,特別適合處理敏感資料的場景。

然而,記憶體暫存的限制在大檔案處理場景中會被放大。大型影片檔案(動輒數 GB)會對系統記憶體造成巨大壓力,當多個用戶同時上傳檔案時,很容易觸發記憶體不足錯誤(OOM),導致整個服務崩潰。更根本的問題是記憶體資源的有限性,這使得此方案難以應對高併發或超大檔案的商業場景。

磁碟暫存的企業級選擇

相較之下,磁碟暫存提供了更為穩健的解決方案。磁碟空間通常遠大於可用記憶體,能夠從容處理任意大小的檔案。更重要的是,通過串流方式讀寫檔案,系統的記憶體佔用量能始終保持在可控的低水平,即使處理數十 GB 的影片檔案也不會對系統穩定性造成威脅。

磁碟暫存的另一個顯著優勢是其優秀的併發特性。多個任務可以同時處理各自的檔案而不會互相影響,這為水平擴展奠定了基礎。此外,磁碟暫存具備良好的容錯性,即使系統意外重啟,磁碟上的檔案仍然可以繼續處理,不會造成用戶資料的遺失。

當然,磁碟暫存也有其考量之處。磁碟 I/O 的速度確實不如記憶體存取,但對於影片處理這類 CPU 密集型任務而言,這種差異的影響相當有限。另一個需要注意的是暫存檔案的生命週期管理,必須建立完善的清理機制,避免磁碟空間的洩漏。

最終選擇:磁碟暫存

綜合考量穩定性、擴展性與實際部署需求,我們選擇磁碟暫存方案。這個決策基於以下理由:

  1. SaaS 服務特性:面向多用戶的商業服務必須能穩定處理各種規模的檔案
  2. 資源效率:避免因記憶體不足導致的服務中斷,提供更可預測的服務品質
  3. 運維友善:磁碟暫存搭配適當的清理策略,更便於監控與維護

雖然磁碟 I/O 會帶來些微的效能開銷,但相較於系統穩定性和可擴展性的收益,這個取捨是非常值得的。

架構升級:從 BackgroundTasks 邁向 Celery

Celery 是一套成熟的分散式任務佇列系統,基於訊息傳遞架構設計。它的核心組件協同運作,形成一個強大的非同步處理生態系統:

  • Producer (任務生產者):我們的 FastAPI 應用程式,負責將任務發布到佇列中
  • Broker (訊息代理):可靠的中間件(如 Redis 或 RabbitMQ),負責儲存和路由任務訊息
  • Consumer (任務消費者):獨立運行的 Celery Worker 程序,負責從佇列中領取並執行任務
  • Result Backend (結果後端):持久化儲存任務的執行狀態和結果資料,同樣可採用 Redis

1. 定義 Celery 任務

影片處理的邏輯現在被定義為一個 Celery Task。bind=True 允許我們在任務內部存取 self,從而可以更新狀態和檢查是否被撤銷。

# tasks.py
import os
import time
from celery import Celery
from celery.exceptions import Ignore

# 使用 Redis 作為 Broker 和 Backend
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task(bind=True)
def process_video_task(self, temp_file_path: str):
    """Celery Worker 執行的影片處理任務"""
    try:
        total_frames = 100
        for i in range(total_frames):
            # 檢查是否接收到撤銷指令

            result = AsyncResult(self.request.id, app=app)
            if result.state == 'REVOKED':
                # 清理資源後,拋出 Ignore 使任務狀態變為 REVOKED
                if os.path.exists(temp_file_path):
                    os.remove(temp_file_path)
                raise Ignore()

            # ... 模擬影片處理 ...
            time.sleep(0.5)

            # 使用 update_state 更新自定義的進度元數據
            self.update_state(state="PROGRESS", meta={"progress": (i + 1)})

        # 處理完成
        return {"status": "Completed", "result_path": "..."}
    finally:
        # 確保無論如何暫存檔都會被刪除
        if os.path.exists(temp_file_path):
            os.remove(temp_file_path)

2. FastAPI 端點的改造

FastAPI 現在只負責接收檔案、儲存並發布任務,然後就可以立即回應。

# main.py
import os
import uuid
import shutil
from fastapi import FastAPI, UploadFile, File
from celery.result import AsyncResult
from tasks import process_video_task

app = FastAPI()
TEMP_DIR = "."  # 或其他適合的暫存目錄

@app.post("/upload/celery")
async def upload_video_celery(file: UploadFile = File(...)):
    task_id = str(uuid.uuid4())
    temp_file_path = os.path.join(TEMP_DIR, f"{task_id}_{file.filename}")

    # 以串流方式將檔案寫入本地
    with open(temp_file_path, "wb") as buffer:
        shutil.copyfileobj(file.file, buffer)

    # 異步發布任務到 Celery,並立即獲得 task_id
    task = process_video_task.delay(temp_file_path)
    return {"task_id": task.id}

@app.post("/tasks/{task_id}/cancel")
async def cancel_task(task_id: str):
    # 發送撤銷指令,需要指定 Celery app 實例
    result = AsyncResult(task_id, app=celery_app)
    result.revoke(terminate=True)
    return {"message": "已請求取消任務"}

3. API 端點:SSE 進度追蹤

# main.py
import asyncio
import json
from starlette.responses import StreamingResponse
from celery.result import AsyncResult
from tasks import process_video_task, app as celery_app # 導入 celery app 實例

async def progress_generator(task_id: str):
    """
    一個非同步生成器,用於從 Celery Result Backend 查詢進度並推播。
    """
    # 透過 task_id 取得 AsyncResult 物件,app=celery_app 確保使用正確的 backend 配置
    result = AsyncResult(task_id, app=celery_app)

    while not result.ready():
        # result.ready() 會在任務完成、失敗或被撤銷時返回 True
        
        state = result.state
        progress = 0
        
        # 我們在 Celery Task 中使用 'PROGRESS' 作為自定義的進行中狀態
        if state == 'PROGRESS':
            # result.info 是我們在 update_state 中存入的 meta 字典
            progress = result.info.get('progress', 0)
        elif state == 'PENDING':
            # 任務還在佇列中等待被 Worker 領取
            state = 'PENDING'
            progress = 0
        elif state == 'STARTED':
            # 任務已被 Worker 領取,但還未回報進度
            state = 'STARTED'
            progress = 0
        
        # 建立 JSON 格式的回應資料
        response_data = {
            "state": state,
            "progress": progress
        }
        # 遵循 SSE 格式: "data: <json_string>\n\n"
        yield f"data: {json.dumps(response_data)}\n\n"
        
        # 每秒查詢一次狀態,避免過於頻繁地請求 Redis
        await asyncio.sleep(1)

    # 當迴圈結束,代表任務已完成,發送最後一次的最終狀態
    final_state = {
        "state": result.state,
        "progress": 100,
    }
    # 如果任務成功,可以附帶結果
    if result.successful():
        final_state["result"] = result.get()
    elif result.failed():
        final_state["error"] = str(result.info) if result.info else "任務執行失敗"
    
    yield f"data: {json.dumps(final_state)}\n\n"

# SSE 端點需要修改為從 Celery Result Backend 查詢狀態
@app.get("/progress/{task_id}")
async def get_progress(task_id: str):
    """
    提供 SSE 端點,讓前端可以即時追蹤 Celery 任務的進度。
    """
    # StreamingResponse 會保持連線,並持續調用 progress_generator 來發送資料
    return StreamingResponse(progress_generator(task_id), media_type="text/event-stream")

優化討論:進階架構思考

引入 Celery 後,我們得以思考更複雜、更高效的處理架構。

1. 前端處理 Frame 的邊界

一種有趣的設計是讓前端(瀏覽器)負責將影片拆分成 Frame,後端只接收並處理單張圖片。

  • 特色:後端任務極度簡化(原子化),非常適合 Serverless 架構;前端可以實現非常精確的進度回報。
  • 限制:這會給客戶端帶來巨大的 CPU 和記憶體負擔,並產生驚人的網路開銷(數千次 HTTP 請求及倍增的傳輸資料量),同時還丟失了音訊軌道。總體而言,這種設計在絕大多數 Web 應用中是不切實際且不推薦的。

2. 伺服器端的平行化處理

一個更專業、更強大的架構是在後端實現平行處理。利用 Celery 的工作流(Canvas)功能,我們可以這樣設計:

  1. FastAPI 接收影片,啟動一個「主任務 (Master Task)」。
  2. 「主任務」在伺服器上用 ffmpeg 將影片拆分成數千個 Frame。
  3. 「主任務」為每一 Frame 或每一批 Frame,發布一個「子任務 (Child Task)」。
  4. Celery 會將成千上萬的「子任務」自動分發給所有空閒的 Worker 進行大規模平行處理。
  5. 所有「子任務」完成後,一個「聚合任務」負責將處理好的 Frame 重新合成為最終影片。

這種架構能最大限度地利用運算資源,極大地縮短大型任務的處理時間。

3. AI 應用中的限制

上述的平行處理架構有一個重要前提:每個 Frame 的處理是獨立的,例如套用濾鏡、單幀物件偵測等。

然而,在許多高階 AI 應用中,任務需要上下文資訊 (Context),無法這樣完全平行化。例如:

  • 行為辨識 (Action Recognition):AI 需要觀察連續多幀的變化才能判斷一個動作是「走路」還是「跑步」。
  • 物件追蹤 (Object Tracking):AI 需要知道一個物體在上一幀的位置,才能預測它在下一幀的位置。
  • 影片摘要 (Video Summarization):AI 需要理解整個影片的時序關係才能生成摘要。

在這些情境下,雖然仍可做一定程度的平行化(例如,將影片切成有重疊的片段處理),但無法像簡單濾鏡那樣將每一幀完全獨立對待。

小結

透過引入 Celery 分散式任務佇列,我們成功將影片處理服務從單體原型升級為企業級分散式系統。我們不僅實現了可靠的任務管理機制,更探討了磁碟暫存策略與平行處理架構的實務考量,為構建高可靠 AI 服務奠定了重要的技術基礎。


上一篇
[Day 25] 實戰範例 (二):影片處理 SaaS (上)
系列文
用 FastAPI 打造你的 AI 服務26
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言